/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.solr.cloud.api.collections;

import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;

import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaCount;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Reindex a collection, usually in order to change the index schema.
 *
 * <p>WARNING: Reindexing is potentially a lossy operation - some indexed data that is not available
 * as stored fields may be irretrievably lost, so users should use this command with caution,
 * evaluating the potential impact by using different source and target collection names first, and
 * preserving the source collection until the evaluation is complete.
 *
 * <p>Reindexing follows these steps:
 *
 * <ol>
 *   <li>creates a temporary collection using the most recent schema of the source collection (or
 *       the one specified in the parameters, which must already exist), and the shape of the
 *       original collection, unless overridden by parameters.
 *   <li>copy the source documents to the temporary collection, using their stored fields and
 *       reindexing them using the specified schema. NOTE: some data loss may occur if the original
 *       stored field data is not available!
 *   <li>create the target collection from scratch with the specified name (or the same as source if
 *       not specified) and the specified parameters. NOTE: if the target name was not specified or
 *       is the same as the source collection then a unique sequential collection name will be used.
 *   <li>copy the documents from the source collection to the target collection.
 *   <li>if the source and target collection name was the same then set up an alias pointing from
 *       the source collection name to the actual (sequentially named) target collection
 *   <li>optionally delete the source collection.
 * </ol>
 */
public class ReindexCollectionCmd implements CollApiCmds.CollectionApiCommand {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  public static final String COMMAND = "cmd";
  public static final String REINDEX_STATUS = "reindexStatus";
  public static final String REMOVE_SOURCE = "removeSource";
  public static final String TARGET = "target";
  public static final String TARGET_COL_PREFIX = ".rx_";
  public static final String CHK_COL_PREFIX = ".rx_ck_";
  public static final String REINDEXING_STATE = CollectionAdminRequest.PROPERTY_PREFIX + "rx";

  public static final String STATE = "state";
  public static final String PHASE = "phase";

  private static final List<String> COLLECTION_PARAMS =
      Stream.concat(
              CollectionHandlingUtils.numReplicasProperties().stream(),
              Stream.of(
                  ZkStateReader.CONFIGNAME_PROP,
                  ZkStateReader.NUM_SHARDS_PROP,
                  ZkStateReader.REPLICATION_FACTOR,
                  "shards",
                  CollectionAdminParams.CREATE_NODE_SET_PARAM,
                  CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM))
          .collect(Collectors.toUnmodifiableList());

  private final CollectionCommandContext ccc;

  private static AtomicInteger tmpCollectionSeq = new AtomicInteger();

  public enum State {
    IDLE,
    RUNNING,
    ABORTED,
    FINISHED;

    public String toLower() {
      return toString().toLowerCase(Locale.ROOT);
    }

    public static State get(Object p) {
      if (p == null) {
        return null;
      }
      p = String.valueOf(p).toLowerCase(Locale.ROOT);
      return states.get(p);
    }

    static final Map<String, State> states =
        Stream.of(State.values())
            .collect(Collectors.toUnmodifiableMap(State::toLower, Function.identity()));
  }

  public enum Cmd {
    START,
    ABORT,
    STATUS;

    public String toLower() {
      return toString().toLowerCase(Locale.ROOT);
    }

    public static Cmd get(String p) {
      if (p == null) {
        return null;
      }
      p = p.toLowerCase(Locale.ROOT);
      return cmds.get(p);
    }

    static final Map<String, Cmd> cmds =
        Stream.of(Cmd.values())
            .collect(Collectors.toUnmodifiableMap(Cmd::toLower, Function.identity()));
  }

  public ReindexCollectionCmd(CollectionCommandContext ccc) {
    this.ccc = ccc;
  }

  @Override
  public void call(ClusterState clusterState, ZkNodeProps message, NamedList<Object> results)
      throws Exception {

    log.debug("*** called: {}", message);

    String extCollection = message.getStr(CommonParams.NAME);

    if (extCollection == null) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST, "Source collection name must be specified");
    }
    boolean followAliases = message.getBool(FOLLOW_ALIASES, false);
    String collection;
    if (followAliases) {
      collection =
          ccc.getSolrCloudManager().getClusterStateProvider().resolveSimpleAlias(extCollection);
    } else {
      collection = extCollection;
    }
    if (!clusterState.hasCollection(collection)) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST, "Source collection name must exist");
    }
    String target = message.getStr(TARGET);
    if (target == null) {
      target = collection;
    } else {
      if (followAliases) {
        target = ccc.getSolrCloudManager().getClusterStateProvider().resolveSimpleAlias(target);
      }
    }
    boolean sameTarget = target.equals(collection) || target.equals(extCollection);
    boolean removeSource = message.getBool(REMOVE_SOURCE, false);
    Cmd command = Cmd.get(message.getStr(COMMAND, Cmd.START.toLower()));
    if (command == null) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + message.getStr(COMMAND));
    }
    Map<String, Object> reindexingState =
        getReindexingState(ccc.getSolrCloudManager().getDistribStateManager(), collection);
    if (!reindexingState.containsKey(STATE)) {
      reindexingState.put(STATE, State.IDLE.toLower());
    }
    State state = State.get(reindexingState.get(STATE));
    if (command == Cmd.ABORT) {
      log.info("Abort requested for collection {}, setting the state to ABORTED.", collection);
      // check that it's running
      if (state != State.RUNNING) {
        log.debug(
            "Abort requested for collection {} but command is not running: {}", collection, state);
        return;
      }
      setReindexingState(collection, State.ABORTED, null);
      reindexingState.put(STATE, "aborting");
      results.add(REINDEX_STATUS, reindexingState);
      // if needed the cleanup will be performed by the running instance of the command
      return;
    } else if (command == Cmd.STATUS) {
      results.add(REINDEX_STATUS, reindexingState);
      return;
    }
    // command == Cmd.START

    // check it's not already running
    if (state == State.RUNNING) {
      throw new SolrException(
          SolrException.ErrorCode.BAD_REQUEST,
          "Reindex is already running for collection "
              + collection
              + ". If you are sure this is not the case you can issue &cmd=abort to clean up this state.");
    }

    DocCollection coll = clusterState.getCollection(collection);
    boolean aborted = false;
    int batchSize = message.getInt(CommonParams.ROWS, 100);
    String query = message.getStr(CommonParams.Q, "*:*");
    String fl = message.getStr(CommonParams.FL, "*");
    Integer rf = message.getInt(ZkStateReader.REPLICATION_FACTOR, coll.getReplicationFactor());
    ReplicaCount numReplicas = ReplicaCount.fromMessage(message, coll);
    int numShards = message.getInt(ZkStateReader.NUM_SHARDS_PROP, coll.getActiveSlices().size());
    DocRouter router = coll.getRouter();
    if (router == null) {
      router = DocRouter.DEFAULT;
    }

    String configName = message.getStr(ZkStateReader.CONFIGNAME_PROP, coll.getConfigName());
    String targetCollection;
    int seq = tmpCollectionSeq.getAndIncrement();
    if (sameTarget) {
      do {
        targetCollection = TARGET_COL_PREFIX + extCollection + "_" + seq;
        if (!clusterState.hasCollection(targetCollection)) {
          break;
        }
        seq = tmpCollectionSeq.getAndIncrement();
      } while (clusterState.hasCollection(targetCollection));
    } else {
      targetCollection = target;
    }
    String chkCollection = CHK_COL_PREFIX + extCollection;
    String daemonUrl = null;
    Replica daemonReplica = null;
    Exception exc = null;
    boolean createdTarget = false;
    try {
      // set the running flag
      reindexingState.clear();
      reindexingState.put("actualSourceCollection", collection);
      reindexingState.put("actualTargetCollection", targetCollection);
      reindexingState.put("checkpointCollection", chkCollection);
      reindexingState.put("inputDocs", getNumberOfDocs(collection));
      reindexingState.put(PHASE, "creating target and checkpoint collections");
      setReindexingState(collection, State.RUNNING, reindexingState);

      // 0. set up target and checkpoint collections
      NamedList<Object> cmdResults = new NamedList<>();
      ZkNodeProps cmd;
      if (clusterState.hasCollection(targetCollection)) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR,
            "Target collection " + targetCollection + " already exists! Delete it first.");
      }
      if (clusterState.hasCollection(chkCollection)) {
        // delete the checkpoint collection
        cmd =
            new ZkNodeProps(
                Overseer.QUEUE_OPERATION,
                CollectionParams.CollectionAction.DELETE.toLower(),
                CommonParams.NAME,
                chkCollection);
        new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
        CollectionHandlingUtils.checkResults(
            "deleting old checkpoint collection " + chkCollection, cmdResults, true);
      }

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }

      Map<String, Object> propMap = new HashMap<>();
      propMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
      propMap.put(CommonParams.NAME, targetCollection);
      propMap.put(ZkStateReader.NUM_SHARDS_PROP, numShards);
      propMap.put(CollectionAdminParams.COLL_CONF, configName);
      // init first from the same router
      propMap.put("router.name", router.getName());
      for (String key : coll.keySet()) {
        if (key.startsWith("router.")) {
          propMap.put(key, coll.get(key));
        }
      }
      // then apply overrides if present
      for (String key : message.keySet()) {
        if (key.startsWith("router.")) {
          propMap.put(key, message.getStr(key));
        } else if (COLLECTION_PARAMS.contains(key)) {
          propMap.put(key, message.get(key));
        }
      }

      propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
      if (rf != null) {
        propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
      }
      numReplicas.writeProps(propMap);
      // create the target collection
      cmd = new ZkNodeProps(propMap);
      cmdResults = new NamedList<>();
      new CreateCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      createdTarget = true;
      CollectionHandlingUtils.checkResults(
          "creating target collection " + targetCollection, cmdResults, true);

      // create the checkpoint collection - use RF=1 and 1 shard
      cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
              CommonParams.NAME, chkCollection,
              ZkStateReader.NUM_SHARDS_PROP, "1",
              ZkStateReader.REPLICATION_FACTOR, "1",
              CollectionAdminParams.COLL_CONF, "_default",
              CommonAdminParams.WAIT_FOR_FINAL_STATE, "true");
      cmdResults = new NamedList<>();
      new CreateCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      CollectionHandlingUtils.checkResults(
          "creating checkpoint collection " + chkCollection, cmdResults, true);
      // wait for a while until we see both collections
      try {
        for (String col : List.of(targetCollection, chkCollection)) {
          ccc.getZkStateReader().waitForState(col, 30, TimeUnit.SECONDS, Objects::nonNull);
        }
      } catch (TimeoutException e) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR, "Could not fully create temporary collection(s)");
      }

      clusterState = ccc.getSolrCloudManager().getClusterState();

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }

      // 1. put the source collection in read-only mode
      cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
              ZkStateReader.COLLECTION_PROP,
              collection,
              ZkStateReader.READ_ONLY,
              "true");
      if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
        ccc.getDistributedClusterStateUpdater()
            .doSingleStateUpdate(
                DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
                cmd,
                ccc.getSolrCloudManager(),
                ccc.getZkStateReader());
      } else {
        ccc.offerStateUpdate(cmd);
      }

      TestInjection.injectReindexLatch();

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }

      // 2. copy the documents to target
      // Recipe taken from:
      // http://joelsolr.blogspot.com/2016/10/solr-63-batch-jobs-parallel-etl-and.html
      ModifiableSolrParams q = new ModifiableSolrParams();
      q.set(CommonParams.QT, "/stream");
      q.set("collection", collection);
      q.set(
          "expr",
          "daemon(id=\""
              + targetCollection
              + "\","
              + "terminate=\"true\","
              + "commit("
              + targetCollection
              + ","
              + "update("
              + targetCollection
              + ","
              + "batchSize="
              + batchSize
              + ","
              + "topic("
              + chkCollection
              + ","
              + collection
              + ","
              + "q=\""
              + query
              + "\","
              + "fl=\""
              + fl
              + "\","
              + "id=\"topic_"
              + targetCollection
              + "\","
              + "rows=\""
              + batchSize
              + "\","
              + "initialCheckpoint=\"0\"))))");
      log.debug("- starting copying documents from {} to {}", collection, targetCollection);
      SolrResponse rsp;
      try {
        rsp = new QueryRequest(q).process(ccc.getSolrCloudManager().getSolrClient());
      } catch (Exception e) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR,
            "Unable to copy documents from " + collection + " to " + targetCollection,
            e);
      }
      daemonReplica = getReplicaForDaemon(rsp, coll);
      if (daemonReplica == null) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR,
            "Unable to copy documents from "
                + collection
                + " to "
                + targetCollection
                + ": "
                + Utils.toJSONString(rsp));
      }
      reindexingState.put("daemonUrl", daemonReplica.getCoreUrl());
      reindexingState.put("daemonName", targetCollection);
      reindexingState.put(PHASE, "copying documents");
      setReindexingState(collection, State.RUNNING, reindexingState);

      // wait for the daemon to finish
      waitForDaemon(targetCollection, daemonReplica, collection, targetCollection, reindexingState);
      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }
      log.debug("- finished copying from {} to {}", collection, targetCollection);
      // fail here or earlier during daemon run
      TestInjection.injectReindexFailure();

      // 5. if (sameTarget) set up an alias to use targetCollection as the source name
      if (sameTarget) {
        log.debug("- setting up alias from {} to {}", extCollection, targetCollection);
        cmd = new ZkNodeProps(CommonParams.NAME, extCollection, "collections", targetCollection);
        cmdResults = new NamedList<>();
        new CreateAliasCmd(ccc).call(clusterState, cmd, cmdResults);
        CollectionHandlingUtils.checkResults(
            "setting up alias " + extCollection + " -> " + targetCollection, cmdResults, true);
        reindexingState.put("alias", extCollection + " -> " + targetCollection);
      }

      reindexingState.remove("daemonUrl");
      reindexingState.remove("daemonName");
      reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
      reindexingState.put(PHASE, "copying done, finalizing");
      setReindexingState(collection, State.RUNNING, reindexingState);

      if (maybeAbort(collection)) {
        aborted = true;
        return;
      }
      // 6. delete the checkpoint collection
      log.debug("- deleting {}", chkCollection);
      cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.DELETE.toLower(),
              CommonParams.NAME,
              chkCollection);
      cmdResults = new NamedList<>();
      new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      CollectionHandlingUtils.checkResults(
          "deleting checkpoint collection " + chkCollection, cmdResults, true);

      // 7. optionally delete the source collection
      if (removeSource) {
        log.debug("- deleting source collection");
        cmd =
            new ZkNodeProps(
                Overseer.QUEUE_OPERATION,
                CollectionParams.CollectionAction.DELETE.toLower(),
                CommonParams.NAME,
                collection,
                FOLLOW_ALIASES,
                "false");
        cmdResults = new NamedList<>();
        new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
        CollectionHandlingUtils.checkResults(
            "deleting source collection " + collection, cmdResults, true);
      } else {
        // 8. clear readOnly on source
        ZkNodeProps props =
            new ZkNodeProps(
                Overseer.QUEUE_OPERATION,
                CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
                ZkStateReader.COLLECTION_PROP,
                collection,
                ZkStateReader.READ_ONLY,
                null);
        if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
          ccc.getDistributedClusterStateUpdater()
              .doSingleStateUpdate(
                  DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
                  props,
                  ccc.getSolrCloudManager(),
                  ccc.getZkStateReader());
        } else {
          ccc.offerStateUpdate(props);
        }
      }
      // 9. set FINISHED state on the target and clear the state on the source
      ZkNodeProps props =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
              ZkStateReader.COLLECTION_PROP,
              targetCollection,
              REINDEXING_STATE,
              State.FINISHED.toLower());
      if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
        ccc.getDistributedClusterStateUpdater()
            .doSingleStateUpdate(
                DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
                props,
                ccc.getSolrCloudManager(),
                ccc.getZkStateReader());
      } else {
        ccc.offerStateUpdate(props);
      }

      reindexingState.put(STATE, State.FINISHED.toLower());
      reindexingState.put(PHASE, "done");
      removeReindexingState(collection);
    } catch (Exception e) {
      log.warn("Error during reindexing of {}", extCollection, e);
      exc = e;
      aborted = true;
    } finally {
      if (aborted) {
        cleanup(
            collection,
            targetCollection,
            chkCollection,
            daemonReplica,
            targetCollection,
            createdTarget);
        if (exc != null) {
          results.add("error", exc.toString());
        }
        reindexingState.put(STATE, State.ABORTED.toLower());
      }
      results.add(REINDEX_STATUS, reindexingState);
    }
  }

  private static final String REINDEXING_STATE_PATH = "/.reindexing";

  private Map<String, Object> setReindexingState(
      String collection, State state, Map<String, Object> props) throws Exception {
    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
    DistribStateManager stateManager = ccc.getSolrCloudManager().getDistribStateManager();
    if (props == null) { // retrieve existing props, if any
      props = stateManager.getJson(path);
    }
    Map<String, Object> copyProps = new HashMap<>(props);
    copyProps.put("state", state.toLower());
    if (stateManager.hasData(path)) {
      stateManager.setData(path, Utils.toJSON(copyProps), -1);
    } else {
      stateManager.makePath(path, Utils.toJSON(copyProps), CreateMode.PERSISTENT, false);
    }
    return copyProps;
  }

  private void removeReindexingState(String collection) throws Exception {
    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
    DistribStateManager stateManager = ccc.getSolrCloudManager().getDistribStateManager();
    if (stateManager.hasData(path)) {
      stateManager.removeData(path, -1);
    }
  }

  @VisibleForTesting
  public static Map<String, Object> getReindexingState(
      DistribStateManager stateManager, String collection) throws Exception {
    String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + REINDEXING_STATE_PATH;
    // make it modifiable
    return new TreeMap<>(stateManager.getJson(path));
  }

  private long getNumberOfDocs(String collection) {
    var solrClient = ccc.getCoreContainer().getZkController().getSolrClient();
    try {
      ModifiableSolrParams params = new ModifiableSolrParams();
      params.add(CommonParams.Q, "*:*");
      params.add(CommonParams.ROWS, "0");
      QueryResponse rsp = solrClient.query(collection, params);
      return rsp.getResults().getNumFound();
    } catch (Exception e) {
      return 0L;
    }
  }

  private boolean maybeAbort(String collection) throws Exception {
    DocCollection coll =
        ccc.getSolrCloudManager().getClusterState().getCollectionOrNull(collection);
    if (coll == null) {
      // collection no longer present - abort
      log.info("## Aborting - collection {} no longer present.", collection);
      return true;
    }
    Map<String, Object> reindexingState =
        getReindexingState(ccc.getSolrCloudManager().getDistribStateManager(), collection);
    State state = State.get(reindexingState.getOrDefault(STATE, State.RUNNING.toLower()));
    if (state != State.ABORTED) {
      return false;
    }
    log.info("## Aborting - collection {} state is {}", collection, state);
    return true;
  }

  // XXX see #waitForDaemon() for why we need this
  private Replica getReplicaForDaemon(SolrResponse rsp, DocCollection coll) {
    @SuppressWarnings({"unchecked"})
    Map<String, Object> rs = (Map<String, Object>) rsp.getResponse().get("result-set");
    if (rs == null || rs.isEmpty()) {
      if (log.isDebugEnabled()) {
        log.debug(" -- Missing daemon information in response: {}", Utils.toJSONString(rsp));
      }
    }
    @SuppressWarnings({"unchecked"})
    List<Object> list = (List<Object>) rs.get("docs");
    if (list == null) {
      if (log.isDebugEnabled()) {
        log.debug(" -- Missing daemon information in response: {}", Utils.toJSONString(rsp));
      }
      return null;
    }
    String replicaName = null;
    for (Object o : list) {
      @SuppressWarnings({"unchecked"})
      Map<String, Object> map = (Map<String, Object>) o;
      String op = (String) map.get("DaemonOp");
      if (op == null) {
        continue;
      }
      String[] parts = op.split("\\s+");
      if (parts.length != 4) {
        log.debug(" -- Invalid daemon location info, expected 4 tokens: {}", op);
        return null;
      }
      // check if it's plausible
      if (parts[3].contains("shard") && parts[3].contains("replica")) {
        replicaName = parts[3];
        break;
      } else {
        log.debug(" -- daemon location info likely invalid: {}", op);
        return null;
      }
    }
    if (replicaName == null) {
      return null;
    }
    // build a baseUrl of the replica
    for (Replica r : coll.getReplicas()) {
      if (replicaName.equals(r.getCoreName())) {
        return r;
      }
    }
    return null;
  }

  // XXX currently this is complicated to due a bug in the way the daemon 'list'
  // XXX operation is implemented - see SOLR-13245. We need to query the actual
  // XXX SolrCore where the daemon is running
  @SuppressWarnings({"unchecked"})
  private void waitForDaemon(
      String daemonName,
      Replica daemonReplica,
      String sourceCollection,
      String targetCollection,
      Map<String, Object> reindexingState)
      throws Exception {

    boolean isRunning;
    int statusCheck = 0;
    do {
      isRunning = false;
      statusCheck++;
      try {
        NamedList<Object> rsp = executeDaemonAction("list", daemonName, daemonReplica);
        Map<String, Object> rs = (Map<String, Object>) rsp.get("result-set");
        if (rs == null || rs.isEmpty()) {
          throw new SolrException(
              SolrException.ErrorCode.SERVER_ERROR,
              "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
        }
        List<Object> list = (List<Object>) rs.get("docs");
        if (list == null) {
          throw new SolrException(
              SolrException.ErrorCode.SERVER_ERROR,
              "Can't find daemon list: missing result-set: " + Utils.toJSONString(rsp));
        }
        if (list.isEmpty()) { // finished?
          break;
        }
        for (Object o : list) {
          Map<String, Object> map = (Map<String, Object>) o;
          String id = (String) map.get("id");
          if (daemonName.equals(id)) {
            isRunning = true;
            // fail here
            TestInjection.injectReindexFailure();
            break;
          }
        }
      } catch (Exception e) {
        throw new SolrException(
            SolrException.ErrorCode.SERVER_ERROR,
            "Exception waiting for daemon " + daemonName + " at " + daemonReplica.getCoreUrl(),
            e);
      }
      if (statusCheck % 5 == 0) {
        reindexingState.put("processedDocs", getNumberOfDocs(targetCollection));
        setReindexingState(sourceCollection, State.RUNNING, reindexingState);
      }
      ccc.getSolrCloudManager().getTimeSource().sleep(2000);
    } while (isRunning && !maybeAbort(sourceCollection));
  }

  @SuppressWarnings({"unchecked"})
  private void killDaemon(String daemonName, Replica daemonReplica) throws Exception {
    if (log.isDebugEnabled()) {
      log.debug("-- killing daemon {} at {}", daemonName, daemonReplica.getCoreUrl());
    }

    // we should really use 'kill' here, but then we will never
    // know when the daemon actually finishes running - 'kill' only
    // sets a flag that may be noticed much later
    NamedList<Object> rsp = executeDaemonAction("stop", daemonName, daemonReplica);
    // /result-set/docs/[0]/DaemonOp : Deamon:id killed on coreName
    if (log.isDebugEnabled()) {
      log.debug(" -- stop daemon response: {}", Utils.toJSONString(rsp));
    }
    Map<String, Object> rs = (Map<String, Object>) rsp.get("result-set");
    if (rs == null || rs.isEmpty()) {
      log.warn(
          "Problem killing daemon {}: missing result-set: {}", daemonName, Utils.toJSONString(rsp));
      return;
    }
    List<Object> list = (List<Object>) rs.get("docs");
    if (list == null) {
      log.warn(
          "Problem killing daemon {}: missing result-set: {}", daemonName, Utils.toJSONString(rsp));
      return;
    }
    if (list.isEmpty()) { // already finished?
      return;
    }
    for (Object o : list) {
      Map<String, Object> map = (Map<String, Object>) o;
      String op = (String) map.get("DaemonOp");
      if (op == null) {
        continue;
      }
      if (op.contains(daemonName) && op.contains("stopped")) {
        // now wait for the daemon to really stop
        TimeOut timeOut =
            new TimeOut(60, TimeUnit.SECONDS, ccc.getSolrCloudManager().getTimeSource());
        while (!timeOut.hasTimedOut()) {
          rsp = executeDaemonAction("list", daemonName, daemonReplica);
          rs = (Map<String, Object>) rsp.get("result-set");
          if (rs == null || rs.isEmpty()) {
            log.warn(
                "Problem killing daemon {}: missing result-set: {}",
                daemonName,
                Utils.toJSONString(rsp));
            break;
          }
          List<Object> list2 = (List<Object>) rs.get("docs");
          if (list2 == null) {
            log.warn(
                "Problem killing daemon {}: missing result-set: {}",
                daemonName,
                Utils.toJSONString(rsp));
            break;
          }
          if (list2.isEmpty()) { // already finished?
            break;
          }
          Map<String, Object> status2 = null;
          for (Object o2 : list2) {
            Map<String, Object> map2 = (Map<String, Object>) o2;
            if (daemonName.equals(map2.get("id"))) {
              status2 = map2;
              break;
            }
          }
          if (status2 == null) { // finished?
            break;
          }
          Number stopTime = (Number) status2.get("stopTime");
          if (stopTime.longValue() > 0) {
            break;
          }
        }
        if (timeOut.hasTimedOut()) {
          log.warn("Problem killing daemon {}: timed out waiting for daemon to stop.", daemonName);
          // proceed anyway
        }
      }
    }
    // now kill it - it's already stopped, this simply removes its status
    executeDaemonAction("kill", daemonName, daemonReplica);
  }

  private NamedList<Object> executeDaemonAction(
      String action, String daemonName, Replica daemonReplica) throws Exception {
    final var solrClient = ccc.getCoreContainer().getDefaultHttpSolrClient();

    final var solrParams = new ModifiableSolrParams();
    solrParams.set("action", action);
    solrParams.set(CommonParams.ID, daemonName);
    solrParams.set(CommonParams.DISTRIB, false);

    final var req =
        new GenericSolrRequest(
                SolrRequest.METHOD.POST, "/stream", SolrRequest.SolrRequestType.ADMIN, solrParams)
            .setRequiresCollection(true);
    return solrClient.requestWithBaseUrl(
        daemonReplica.getBaseUrl(), req, daemonReplica.getCoreName());
  }

  private void cleanup(
      String collection,
      String targetCollection,
      String chkCollection,
      Replica daemonReplica,
      String daemonName,
      boolean createdTarget)
      throws Exception {
    log.info("## Cleaning up after abort or error");
    // 1. kill the daemon
    // 2. cleanup target / chk collections IFF the source collection still exists and is not empty
    // 3. cleanup collection state

    if (daemonReplica != null) {
      killDaemon(daemonName, daemonReplica);
    }
    ClusterState clusterState = ccc.getSolrCloudManager().getClusterState();
    NamedList<Object> cmdResults = new NamedList<>();
    if (createdTarget
        && !collection.equals(targetCollection)
        && clusterState.hasCollection(targetCollection)) {
      log.debug(" -- removing {}", targetCollection);
      ZkNodeProps cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.DELETE.toLower(),
              CommonParams.NAME,
              targetCollection,
              FOLLOW_ALIASES,
              "false");
      new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      CollectionHandlingUtils.checkResults(
          "CLEANUP: deleting target collection " + targetCollection, cmdResults, false);
    }
    // remove chk collection
    if (clusterState.hasCollection(chkCollection)) {
      log.debug(" -- removing {}", chkCollection);
      ZkNodeProps cmd =
          new ZkNodeProps(
              Overseer.QUEUE_OPERATION,
              CollectionParams.CollectionAction.DELETE.toLower(),
              CommonParams.NAME,
              chkCollection,
              FOLLOW_ALIASES,
              "false");
      cmdResults = new NamedList<>();
      new DeleteCollectionCmd(ccc).call(clusterState, cmd, cmdResults);
      CollectionHandlingUtils.checkResults(
          "CLEANUP: deleting checkpoint collection " + chkCollection, cmdResults, false);
    }
    log.debug(" -- turning readOnly mode off for {}", collection);
    ZkNodeProps props =
        new ZkNodeProps(
            Overseer.QUEUE_OPERATION,
            CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
            ZkStateReader.COLLECTION_PROP,
            collection,
            ZkStateReader.READ_ONLY,
            null);
    if (ccc.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
      ccc.getDistributedClusterStateUpdater()
          .doSingleStateUpdate(
              DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection,
              props,
              ccc.getSolrCloudManager(),
              ccc.getZkStateReader());
    } else {
      ccc.offerStateUpdate(props);
    }
    removeReindexingState(collection);
  }
}
